home *** CD-ROM | disk | FTP | other *** search
/ PC World Komputer 2010 April / PCWorld0410.iso / hity wydania / Ubuntu 9.10 PL / karmelkowy-koliberek-9.10-netbook-remix-PL.iso / casper / filesystem.squashfs / usr / lib / python2.6 / multiprocessing / pool.pyc (.txt) < prev    next >
Encoding:
Python Compiled Bytecode  |  2009-11-11  |  18.1 KB  |  634 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyc (Python 2.6)
  3.  
  4. __all__ = [
  5.     'Pool']
  6. import threading
  7. import Queue
  8. import itertools
  9. import collections
  10. import time
  11. from multiprocessing import Process, cpu_count, TimeoutError
  12. from multiprocessing.util import Finalize, debug
  13. RUN = 0
  14. CLOSE = 1
  15. TERMINATE = 2
  16. job_counter = itertools.count()
  17.  
  18. def mapstar(args):
  19.     return map(*args)
  20.  
  21.  
  22. def worker(inqueue, outqueue, initializer = None, initargs = ()):
  23.     put = outqueue.put
  24.     get = inqueue.get
  25.     if hasattr(inqueue, '_writer'):
  26.         inqueue._writer.close()
  27.         outqueue._reader.close()
  28.     
  29.     if initializer is not None:
  30.         initializer(*initargs)
  31.     
  32.     while None:
  33.         
  34.         try:
  35.             task = get()
  36.         except (EOFError, IOError):
  37.             debug('worker got EOFError or IOError -- exiting')
  38.             break
  39.  
  40.         if task is None:
  41.             debug('worker got sentinel -- exiting')
  42.             break
  43.         
  44.         (job, i, func, args, kwds) = task
  45.         
  46.         try:
  47.             result = (True, func(*args, **kwds))
  48.         except Exception:
  49.             e = None
  50.             result = (False, e)
  51.  
  52.         continue
  53.         return None
  54.  
  55.  
  56. class Pool(object):
  57.     '''
  58.     Class which supports an async version of the `apply()` builtin
  59.     '''
  60.     Process = Process
  61.     
  62.     def __init__(self, processes = None, initializer = None, initargs = ()):
  63.         self._setup_queues()
  64.         self._taskqueue = Queue.Queue()
  65.         self._cache = { }
  66.         self._state = RUN
  67.         if processes is None:
  68.             
  69.             try:
  70.                 processes = cpu_count()
  71.             except NotImplementedError:
  72.                 processes = 1
  73.             except:
  74.                 None<EXCEPTION MATCH>NotImplementedError
  75.             
  76.  
  77.         None<EXCEPTION MATCH>NotImplementedError
  78.         self._pool = []
  79.         for i in range(processes):
  80.             w = self.Process(target = worker, args = (self._inqueue, self._outqueue, initializer, initargs))
  81.             self._pool.append(w)
  82.             w.name = w.name.replace('Process', 'PoolWorker')
  83.             w.daemon = True
  84.             w.start()
  85.         
  86.         self._task_handler = threading.Thread(target = Pool._handle_tasks, args = (self._taskqueue, self._quick_put, self._outqueue, self._pool))
  87.         self._task_handler.daemon = True
  88.         self._task_handler._state = RUN
  89.         self._task_handler.start()
  90.         self._result_handler = threading.Thread(target = Pool._handle_results, args = (self._outqueue, self._quick_get, self._cache))
  91.         self._result_handler.daemon = True
  92.         self._result_handler._state = RUN
  93.         self._result_handler.start()
  94.         self._terminate = Finalize(self, self._terminate_pool, args = (self._taskqueue, self._inqueue, self._outqueue, self._pool, self._task_handler, self._result_handler, self._cache), exitpriority = 15)
  95.  
  96.     
  97.     def _setup_queues(self):
  98.         SimpleQueue = SimpleQueue
  99.         import queues
  100.         self._inqueue = SimpleQueue()
  101.         self._outqueue = SimpleQueue()
  102.         self._quick_put = self._inqueue._writer.send
  103.         self._quick_get = self._outqueue._reader.recv
  104.  
  105.     
  106.     def apply(self, func, args = (), kwds = { }):
  107.         '''
  108.         Equivalent of `apply()` builtin
  109.         '''
  110.         if not self._state == RUN:
  111.             raise AssertionError
  112.         return self.apply_async(func, args, kwds).get()
  113.  
  114.     
  115.     def map(self, func, iterable, chunksize = None):
  116.         '''
  117.         Equivalent of `map()` builtin
  118.         '''
  119.         if not self._state == RUN:
  120.             raise AssertionError
  121.         return self.map_async(func, iterable, chunksize).get()
  122.  
  123.     
  124.     def imap(self, func, iterable, chunksize = 1):
  125.         '''
  126.         Equivalent of `itertools.imap()` -- can be MUCH slower than `Pool.map()`
  127.         '''
  128.         if not self._state == RUN:
  129.             raise AssertionError
  130.         if chunksize == 1:
  131.             result = IMapIterator(self._cache)
  132.             (self._state == RUN, self._taskqueue.put)(((lambda .0: for i, x in .0:
  133. (result._job, i, func, (x,), { }))(enumerate(iterable)), result._set_length))
  134.             return result
  135.         if not chunksize > 1:
  136.             raise AssertionError
  137.         task_batches = Pool._get_tasks(func, iterable, chunksize)
  138.         result = IMapIterator(self._cache)
  139.         (self._taskqueue.put,)(((lambda .0: for i, x in .0:
  140. (result._job, i, mapstar, (x,), { }))(enumerate(task_batches)), result._set_length))
  141.         return (lambda .0: for chunk in .0:
  142. for item in chunk:
  143. item)(result)
  144.  
  145.     
  146.     def imap_unordered(self, func, iterable, chunksize = 1):
  147.         '''
  148.         Like `imap()` method but ordering of results is arbitrary
  149.         '''
  150.         if not self._state == RUN:
  151.             raise AssertionError
  152.         if chunksize == 1:
  153.             result = IMapUnorderedIterator(self._cache)
  154.             (self._state == RUN, self._taskqueue.put)(((lambda .0: for i, x in .0:
  155. (result._job, i, func, (x,), { }))(enumerate(iterable)), result._set_length))
  156.             return result
  157.         if not chunksize > 1:
  158.             raise AssertionError
  159.         task_batches = Pool._get_tasks(func, iterable, chunksize)
  160.         result = IMapUnorderedIterator(self._cache)
  161.         (self._taskqueue.put,)(((lambda .0: for i, x in .0:
  162. (result._job, i, mapstar, (x,), { }))(enumerate(task_batches)), result._set_length))
  163.         return (lambda .0: for chunk in .0:
  164. for item in chunk:
  165. item)(result)
  166.  
  167.     
  168.     def apply_async(self, func, args = (), kwds = { }, callback = None):
  169.         '''
  170.         Asynchronous equivalent of `apply()` builtin
  171.         '''
  172.         if not self._state == RUN:
  173.             raise AssertionError
  174.         result = ApplyResult(self._cache, callback)
  175.         self._taskqueue.put(([
  176.             (result._job, None, func, args, kwds)], None))
  177.         return result
  178.  
  179.     
  180.     def map_async(self, func, iterable, chunksize = None, callback = None):
  181.         '''
  182.         Asynchronous equivalent of `map()` builtin
  183.         '''
  184.         if not self._state == RUN:
  185.             raise AssertionError
  186.         if not hasattr(iterable, '__len__'):
  187.             iterable = list(iterable)
  188.         
  189.         if chunksize is None:
  190.             (chunksize, extra) = divmod(len(iterable), len(self._pool) * 4)
  191.             if extra:
  192.                 chunksize += 1
  193.             
  194.         
  195.         task_batches = Pool._get_tasks(func, iterable, chunksize)
  196.         result = MapResult(self._cache, chunksize, len(iterable), callback)
  197.         (self._taskqueue.put,)(((lambda .0: for i, x in .0:
  198. (result._job, i, mapstar, (x,), { }))(enumerate(task_batches)), None))
  199.         return result
  200.  
  201.     
  202.     def _handle_tasks(taskqueue, put, outqueue, pool):
  203.         thread = threading.current_thread()
  204.         for taskseq, set_length in iter(taskqueue.get, None):
  205.             i = -1
  206.             for i, task in enumerate(taskseq):
  207.                 if thread._state:
  208.                     debug('task handler found thread._state != RUN')
  209.                     break
  210.                 
  211.                 
  212.                 try:
  213.                     put(task)
  214.                 continue
  215.                 except IOError:
  216.                     debug('could not put task on queue')
  217.                     break
  218.                     continue
  219.                 
  220.  
  221.             elif set_length:
  222.                 debug('doing set_length()')
  223.                 set_length(i + 1)
  224.                 continue
  225.             None<EXCEPTION MATCH>IOError
  226.         
  227.         
  228.         try:
  229.             debug('task handler sending sentinel to result handler')
  230.             outqueue.put(None)
  231.             debug('task handler sending sentinel to workers')
  232.             for p in pool:
  233.                 put(None)
  234.         except IOError:
  235.             debug('task handler got IOError when sending sentinels')
  236.  
  237.         debug('task handler exiting')
  238.  
  239.     _handle_tasks = staticmethod(_handle_tasks)
  240.     
  241.     def _handle_results(outqueue, get, cache):
  242.         thread = threading.current_thread()
  243.         while None:
  244.             
  245.             try:
  246.                 task = get()
  247.             except (IOError, EOFError):
  248.                 debug('result handler got EOFError/IOError -- exiting')
  249.                 return None
  250.  
  251.             if thread._state:
  252.                 if not thread._state == TERMINATE:
  253.                     raise AssertionError
  254.                 debug('result handler found thread._state=TERMINATE')
  255.                 break
  256.             
  257.             if task is None:
  258.                 debug('result handler got sentinel')
  259.                 break
  260.             
  261.             (job, i, obj) = task
  262.             
  263.             try:
  264.                 cache[job]._set(i, obj)
  265.             continue
  266.             except KeyError:
  267.                 continue
  268.             
  269.  
  270.             while cache and thread._state != TERMINATE:
  271.                 
  272.                 try:
  273.                     task = get()
  274.                 except (IOError, EOFError):
  275.                     None<EXCEPTION MATCH>KeyError
  276.                     None<EXCEPTION MATCH>KeyError
  277.                     debug('result handler got EOFError/IOError -- exiting')
  278.                     return None
  279.  
  280.                 if task is None:
  281.                     debug('result handler ignoring extra sentinel')
  282.                     continue
  283.                 
  284.                 (job, i, obj) = task
  285.                 
  286.                 try:
  287.                     cache[job]._set(i, obj)
  288.                 continue
  289.                 except KeyError:
  290.                     continue
  291.                 
  292.  
  293.                 None<EXCEPTION MATCH>KeyError
  294.             if hasattr(outqueue, '_reader'):
  295.                 debug('ensuring that outqueue is not full')
  296.                 
  297.                 try:
  298.                     for i in range(10):
  299.                         if not outqueue._reader.poll():
  300.                             break
  301.                         
  302.                         get()
  303.                 except (IOError, EOFError):
  304.                     pass
  305.                 except:
  306.                     None<EXCEPTION MATCH>(IOError, EOFError)
  307.                 
  308.  
  309.         None<EXCEPTION MATCH>(IOError, EOFError)
  310.         debug('result handler exiting: len(cache)=%s, thread._state=%s', len(cache), thread._state)
  311.  
  312.     _handle_results = staticmethod(_handle_results)
  313.     
  314.     def _get_tasks(func, it, size):
  315.         it = iter(it)
  316.         while None:
  317.             x = tuple(itertools.islice(it, size))
  318.             if not x:
  319.                 return None
  320.             yield (func, x)
  321.             continue
  322.             return None
  323.  
  324.     _get_tasks = staticmethod(_get_tasks)
  325.     
  326.     def __reduce__(self):
  327.         raise NotImplementedError('pool objects cannot be passed between processes or pickled')
  328.  
  329.     
  330.     def close(self):
  331.         debug('closing pool')
  332.         if self._state == RUN:
  333.             self._state = CLOSE
  334.             self._taskqueue.put(None)
  335.         
  336.  
  337.     
  338.     def terminate(self):
  339.         debug('terminating pool')
  340.         self._state = TERMINATE
  341.         self._terminate()
  342.  
  343.     
  344.     def join(self):
  345.         debug('joining pool')
  346.         if not self._state in (CLOSE, TERMINATE):
  347.             raise AssertionError
  348.         self._task_handler.join()
  349.         self._result_handler.join()
  350.         for p in self._pool:
  351.             p.join()
  352.         
  353.  
  354.     
  355.     def _help_stuff_finish(inqueue, task_handler, size):
  356.         debug('removing tasks from inqueue until task handler finished')
  357.         inqueue._rlock.acquire()
  358.         while task_handler.is_alive() and inqueue._reader.poll():
  359.             inqueue._reader.recv()
  360.             time.sleep(0)
  361.  
  362.     _help_stuff_finish = staticmethod(_help_stuff_finish)
  363.     
  364.     def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, task_handler, result_handler, cache):
  365.         debug('finalizing pool')
  366.         task_handler._state = TERMINATE
  367.         taskqueue.put(None)
  368.         debug('helping task handler/workers to finish')
  369.         cls._help_stuff_finish(inqueue, task_handler, len(pool))
  370.         if not result_handler.is_alive() and len(cache) == 0:
  371.             raise AssertionError
  372.         result_handler._state = TERMINATE
  373.         outqueue.put(None)
  374.         if pool and hasattr(pool[0], 'terminate'):
  375.             debug('terminating workers')
  376.             for p in pool:
  377.                 p.terminate()
  378.             
  379.         
  380.         debug('joining task handler')
  381.         task_handler.join(1e+100)
  382.         debug('joining result handler')
  383.         result_handler.join(1e+100)
  384.         if pool and hasattr(pool[0], 'terminate'):
  385.             debug('joining pool workers')
  386.             for p in pool:
  387.                 p.join()
  388.             
  389.         
  390.  
  391.     _terminate_pool = classmethod(_terminate_pool)
  392.  
  393.  
  394. class ApplyResult(object):
  395.     
  396.     def __init__(self, cache, callback):
  397.         self._cond = threading.Condition(threading.Lock())
  398.         self._job = job_counter.next()
  399.         self._cache = cache
  400.         self._ready = False
  401.         self._callback = callback
  402.         cache[self._job] = self
  403.  
  404.     
  405.     def ready(self):
  406.         return self._ready
  407.  
  408.     
  409.     def successful(self):
  410.         if not self._ready:
  411.             raise AssertionError
  412.         return self._success
  413.  
  414.     
  415.     def wait(self, timeout = None):
  416.         self._cond.acquire()
  417.         
  418.         try:
  419.             if not self._ready:
  420.                 self._cond.wait(timeout)
  421.         finally:
  422.             self._cond.release()
  423.  
  424.  
  425.     
  426.     def get(self, timeout = None):
  427.         self.wait(timeout)
  428.         if not self._ready:
  429.             raise TimeoutError
  430.         self._ready
  431.         if self._success:
  432.             return self._value
  433.         raise self._value
  434.  
  435.     
  436.     def _set(self, i, obj):
  437.         (self._success, self._value) = obj
  438.         if self._callback and self._success:
  439.             self._callback(self._value)
  440.         
  441.         self._cond.acquire()
  442.         
  443.         try:
  444.             self._ready = True
  445.             self._cond.notify()
  446.         finally:
  447.             self._cond.release()
  448.  
  449.         del self._cache[self._job]
  450.  
  451.  
  452.  
  453. class MapResult(ApplyResult):
  454.     
  455.     def __init__(self, cache, chunksize, length, callback):
  456.         ApplyResult.__init__(self, cache, callback)
  457.         self._success = True
  458.         self._value = [
  459.             None] * length
  460.         self._chunksize = chunksize
  461.         if chunksize <= 0:
  462.             self._number_left = 0
  463.             self._ready = True
  464.         else:
  465.             self._number_left = length // chunksize + bool(length % chunksize)
  466.  
  467.     
  468.     def _set(self, i, success_result):
  469.         (success, result) = success_result
  470.         if success:
  471.             self._value[i * self._chunksize:(i + 1) * self._chunksize] = result
  472.             self._number_left -= 1
  473.             if self._number_left == 0:
  474.                 if self._callback:
  475.                     self._callback(self._value)
  476.                 
  477.                 del self._cache[self._job]
  478.                 self._cond.acquire()
  479.                 
  480.                 try:
  481.                     self._ready = True
  482.                     self._cond.notify()
  483.                 finally:
  484.                     self._cond.release()
  485.  
  486.             
  487.         else:
  488.             self._success = False
  489.             self._value = result
  490.             del self._cache[self._job]
  491.             self._cond.acquire()
  492.             
  493.             try:
  494.                 self._ready = True
  495.                 self._cond.notify()
  496.             finally:
  497.                 self._cond.release()
  498.  
  499.  
  500.  
  501.  
  502. class IMapIterator(object):
  503.     
  504.     def __init__(self, cache):
  505.         self._cond = threading.Condition(threading.Lock())
  506.         self._job = job_counter.next()
  507.         self._cache = cache
  508.         self._items = collections.deque()
  509.         self._index = 0
  510.         self._length = None
  511.         self._unsorted = { }
  512.         cache[self._job] = self
  513.  
  514.     
  515.     def __iter__(self):
  516.         return self
  517.  
  518.     
  519.     def next(self, timeout = None):
  520.         self._cond.acquire()
  521.         
  522.         try:
  523.             item = self._items.popleft()
  524.         except IndexError:
  525.             if self._index == self._length:
  526.                 raise StopIteration
  527.             self._index == self._length
  528.             self._cond.wait(timeout)
  529.             
  530.             try:
  531.                 item = self._items.popleft()
  532.             except IndexError:
  533.                 if self._index == self._length:
  534.                     raise StopIteration
  535.                 self._index == self._length
  536.                 raise TimeoutError
  537.             except:
  538.                 None<EXCEPTION MATCH>IndexError
  539.             
  540.  
  541.             None<EXCEPTION MATCH>IndexError
  542.         finally:
  543.             self._cond.release()
  544.  
  545.         (success, value) = item
  546.         if success:
  547.             return value
  548.         raise value
  549.  
  550.     __next__ = next
  551.     
  552.     def _set(self, i, obj):
  553.         self._cond.acquire()
  554.         
  555.         try:
  556.             if self._index == i:
  557.                 self._items.append(obj)
  558.                 self._index += 1
  559.                 while self._index in self._unsorted:
  560.                     obj = self._unsorted.pop(self._index)
  561.                     self._items.append(obj)
  562.                     self._index += 1
  563.                     continue
  564.                     self
  565.                 self._cond.notify()
  566.             else:
  567.                 self._unsorted[i] = obj
  568.             if self._index == self._length:
  569.                 del self._cache[self._job]
  570.         finally:
  571.             self._cond.release()
  572.  
  573.  
  574.     
  575.     def _set_length(self, length):
  576.         self._cond.acquire()
  577.         
  578.         try:
  579.             self._length = length
  580.             if self._index == self._length:
  581.                 self._cond.notify()
  582.                 del self._cache[self._job]
  583.         finally:
  584.             self._cond.release()
  585.  
  586.  
  587.  
  588.  
  589. class IMapUnorderedIterator(IMapIterator):
  590.     
  591.     def _set(self, i, obj):
  592.         self._cond.acquire()
  593.         
  594.         try:
  595.             self._items.append(obj)
  596.             self._index += 1
  597.             self._cond.notify()
  598.             if self._index == self._length:
  599.                 del self._cache[self._job]
  600.         finally:
  601.             self._cond.release()
  602.  
  603.  
  604.  
  605.  
  606. class ThreadPool(Pool):
  607.     from dummy import Process
  608.     
  609.     def __init__(self, processes = None, initializer = None, initargs = ()):
  610.         Pool.__init__(self, processes, initializer, initargs)
  611.  
  612.     
  613.     def _setup_queues(self):
  614.         self._inqueue = Queue.Queue()
  615.         self._outqueue = Queue.Queue()
  616.         self._quick_put = self._inqueue.put
  617.         self._quick_get = self._outqueue.get
  618.  
  619.     
  620.     def _help_stuff_finish(inqueue, task_handler, size):
  621.         inqueue.not_empty.acquire()
  622.         
  623.         try:
  624.             inqueue.queue.clear()
  625.             inqueue.queue.extend([
  626.                 None] * size)
  627.             inqueue.not_empty.notify_all()
  628.         finally:
  629.             inqueue.not_empty.release()
  630.  
  631.  
  632.     _help_stuff_finish = staticmethod(_help_stuff_finish)
  633.  
  634.